package org.yunai.yfserver.schedule;

import org.apache.commons.lang.exception.ExceptionUtils;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.slf4j.Logger;
import org.yunai.yfserver.common.LoggerFactory;
import org.yunai.yfserver.message.schedule.ScheduledCronMessage;
import org.yunai.yfserver.message.schedule.ScheduledMessage;
import org.yunai.yfserver.server.IMessageProcessor;
import org.yunai.yfserver.time.TimeService;

import java.util.Date;

/**
 * 定时任务管理器实现
 * User: yunai
 * Date: 13-5-24
 * Time: 下午9:06
 */
public class ScheduleServiceImpl implements ScheduleService {

    private static final Logger LOGGER = LoggerFactory.getLogger(LoggerFactory.Logger.schedule, ScheduleServiceImpl.class);

    /**
     * 时间服务
     */
    private final TimeService timeService;
    /**
     * 消息队列
     */
    private final IMessageProcessor messageProcessor;
    /**
     * 是否停止
     * TODO shutdown方法?
     */
    private volatile boolean shutdown = false;
    /**
     * quartz调度器
     */
    private final Scheduler scheduler;

    public ScheduleServiceImpl(TimeService timeService, IMessageProcessor messageProcessor, Scheduler scheduler) throws SchedulerException {
        LOGGER.info("[constructor] [schedule service start begin]");
        this.timeService = timeService;
        this.messageProcessor = messageProcessor;
        this.scheduler = new StdSchedulerFactory().getScheduler();
        this.scheduler.start();
        LOGGER.info("[constructor] [schedule service start success]");
    }

    @Override
    public void schedule(ScheduledCronMessage msg) {
        LOGGER.debug("[schedule] [msg:{}] [cron:{}].", msg.getClass(), msg.getCron());
        // 设置消息状态为[等待状态]
        msg.setState(ScheduledMessage.STATE_WAITING);
        // 加入到Quartz调度
        MessageRunner runner = new MessageRunner(msg);
        JobDetail job = JobBuilder.newJob(MessageJob.class).build();
        job.getJobDataMap().put("runner", runner);
        msg.setJobKey(job.getKey());
        msg.setScheduler(scheduler);
        CronTrigger trigger = TriggerBuilder.newTrigger().withSchedule(CronScheduleBuilder.cronSchedule(msg.getCron())).build();
        try {
            Date ft = scheduler.scheduleJob(job, trigger);
            LOGGER.info("[schedule] [jobKey({}) has bean scheduled run at: {} and repeat based on expression({})].", job.getKey(), ft, trigger.getCronExpression());
        } catch (SchedulerException e) {
            LOGGER.error("[schedule] [msg:{}] [cron:{}] [exception:{}].", msg.getClass(), msg.getCron(), ExceptionUtils.getStackTrace(e));
        }
    }

    /**
     * 定时消息执行者,向{@link #messageProcessor}添加消息
     */
    private final class MessageRunner implements Runnable {

        private final ScheduledMessage msg;

        private MessageRunner(ScheduledMessage msg) {
            this.msg = msg;
        }

        /**
         * 设置消息状态为[在队列中]，并设置触发时间，最后加入消息队列
         */
        @Override
        public void run() {
            if (shutdown || msg.isCanceled()) {
                return;
            }
            // 设置消息状态为[在消息队列中]
            msg.setState(ScheduledMessage.STATE_IN_QUEUE);
            msg.setTriggerTimestamp(timeService.now());
            // 加入消息队列
            messageProcessor.put(msg);
        }
    }

    /**
     * 消息任务，用于{@link org.quartz.core.QuartzScheduler}调度
     */
    public static final class MessageJob implements Job {

        /**
         * 由于{@link JobBuilder}不支持带参数的构造方法，所以该属性必须通过set方法赋值
         */
        private MessageRunner runner;

        public void setRunner(MessageRunner runner) {
            this.runner = runner;
        }

        @Override
        public void execute(JobExecutionContext context) throws JobExecutionException {
            runner.run();
        }
    }
}
